#!/usr/bin/python
#-*- coding: utf-8 -*-
# 抽取ActiveMQ数据到redis_mq模块
# 作者：王成
# 日期：2017-04-14
import MySQLdb
import redis
import requests
import json
import yaml,logging
from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
import time,timeit,datetime,sys,os,threading
import Queue
from daemon import Daemon
import traceback
import StringIO
import pyactivemq
from pyactivemq import ActiveMQConnectionFactory
from pyactivemq import AcknowledgeMode
from pyactivemq import CMSException
import pickle
import pylru
reload(sys)
sys.setdefaultencoding('utf-8')

class MessageListener(pyactivemq.MessageListener):
    def __init__(self, queue):
        pyactivemq.MessageListener.__init__(self)
        self.queue = queue

    def onMessage(self, message):
        print message.text
        self.queue.put(message.text)
        
class ExceptionListener(pyactivemq.ExceptionListener):
    def onException(ex):
        print ex
        
class MyDaemon(Daemon):
    def execute_sql(self,sql,action='select'):
        try:
            if self.db is None:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
            try:
                self.db.ping()
            except MySQLdb.Error,e:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
       
            mysql_web = self.db.cursor(MySQLdb.cursors.DictCursor)
            r = mysql_web.execute(sql)
            if action=='select':
                r = mysql_web.fetchall()
            elif action=='update':
                pass
            elif action=='insert':
                r = self.db.insert_id()
            mysql_web.close()
            return r
        except Exception, e:
            logging.exception('连接数据库时错误: %s', str(e))
            r = None
            if action=='select':
                r = []
            elif action=='update':
                pass
            elif action=='insert':
                r = 0
            return r
   
    def format_msg(self,msg):
        try:
            tmp = msg.split(',')
            if len(tmp) < 24:
                logging.error('信息长度不正确: %s', msg)
                return None
            row = {}
            if len(tmp[5])<7 or tmp[5]=='未识别' or tmp[5]=='无' or tmp[5]=='无车牌':
                row['license_plate'] = '无牌'
            else:
                row['license_plate'] = tmp[5].strip()
                row['license_plate'] = unicode(row['license_plate'], errors='ignore')
            row['plate_type_id'] = tmp[6]
            row['region_id'] = '370211'
            row['location_id'] = tmp[7].strip("'")
            # if '611051002000' == row['location_id']:
                # file_object = open('611051002000.txt', 'a')
                # file_object.write(tmp[16]+"\r\n")
                # file_object.close( )
            row['loc_id'] = tmp[7].strip("'")
            #logging.error(tmp[7])
            #s.encode('utf-8', "ignore")
            
            #s.decode('unicode_escape').encode('utf-8')
            #row['location_name'] = tmp[8].decode('unicode_escape').encode('utf-8')
            #row['location_name'] = tmp[8].encode('utf-8', "ignore")
            row['location_name'] = unicode(tmp[8], errors='ignore')
            #print row['location_name'],tmp[8]
            #sys.exit()
            #row['capture_type_id'] = tmp[9]
            row['device_id'] = tmp[10]
            row['dev_id'] = tmp[10]
            row['lane_id'] = tmp[11]
            row['speed'] = tmp[12]
            row['capture_time'] = tmp[13]
            row['direction_id'] = tmp[15]
            row['image_url'] = tmp[16]
            row['plate_color'] = tmp[23] 
            
            if row['location_id'] in self.location_to_yisa_location:
                row['location_id'] = self.location_to_yisa_location[row['location_id']]
            else:
                logging.error(tmp[7])
                logging.error(row['location_id'])
                insert_id = self.execute_sql("INSERT INTO mon_location SET location_name = '%s',loc_id='%s',region_code='%s';" % (row['location_name'],row['location_id'],row['region_id']),"insert")
                if insert_id > 0:
                    print insert_id,row['location_name'],row['location_id']
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=update_location_id')
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=make_location_dict_cache')
                    self.load_cache()
                    if row['location_id'] in self.location_to_yisa_location:
                        row['location_id'] = self.location_to_yisa_location[row['location_id']]              
                    
            if row['device_id'] in self.device_to_yisa_device:
                row['device_id'] = self.device_to_yisa_device[row['device_id']]
            else:
                insert_id = self.execute_sql("INSERT INTO mon_device SET dev_id = '%s',loc_id='%s',region_code='%s';" % (row['device_id'],row['location_id'],row['region_id']),"insert")
                if insert_id > 0:
                    print insert_id,row['device_id'],tmp[7]
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=update_device_id') 
                    r = requests.get('http://127.0.0.1:9002/?format=json&action=make_device_dict_cache')
                    self.load_cache()
                    if row['device_id'] in self.device_to_yisa_device:
                        row['device_id'] = self.device_to_yisa_device[row['device_id']] 
            return row
        except Exception, e:
            logging.exception('格式化信息时错误: %s', str(e))
            return None
            
    def load_cache(self):
        self.device_to_yisa_device = {}#第三方设备ID<=>Yisa设备ID映射
        if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+"/data/device_dict.dat"):
            with open(os.path.dirname(os.path.abspath(__file__))+"/data/device_dict.dat") as f:
                d_str = f.read()
                self.device_to_yisa_device = pickle.loads(d_str)        
        self.location_to_yisa_location = {}#第三方卡口ID<=>Yisa卡口ID映射
        if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+"/data/location_dict.dat"):
            with open(os.path.dirname(os.path.abspath(__file__))+"/data/location_dict.dat") as f:
                d_str = f.read()
                self.location_to_yisa_location = pickle.loads(d_str)
                
        # self.yisa_device2yisa_location = {}#Yisa设备ID<=>Yisa卡口映射
        # if os.path.isfile(os.path.dirname(os.path.abspath(__file__))+"/data/device_to_location_dict.dat"):
            # with open(os.path.dirname(os.path.abspath(__file__))+"/data/device_to_location_dict.dat") as f:
                # d_str = f.read()
                # self.yisa_device2yisa_location = pickle.loads(d_str)        
        # self.yisa_location2yisa_region = {}#Yisa卡口<=>Yisa区域映射 
        
    def run(self):      
        config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml')
        self.config = yaml.safe_load(config_file)
        config_file.close()
        
        name = 'yisa_get_msg' 
        logging.basicConfig(level=logging.INFO) 
        handler = RotatingFileHandler('/var/log/%s.log' % name, maxBytes=134217728, backupCount=7)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger('').addHandler(handler)
        #-------------------同步输出到控制台-------------------
        # console = logging.StreamHandler()
        # console.setLevel(logging.INFO)
        # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # console.setFormatter(formatter)
        # logging.getLogger().addHandler(console)
        #-------------------------------------------------------   
        logging.warning('启动 [%s]', name)
        concurrency_lock=threading.BoundedSemaphore(value=self.config['yisa_get_msg']['thread_num'])                  
        self.load_cache()
        
        self.MSG_QUEEN = Queue.Queue(0)
        try:
            self.db = None
            r = redis.StrictRedis(unix_socket_path=self.config['redis_mq']['unix_socket_path'])
            pipe = r.pipeline()
            cache = pylru.lrucache(50000)
            while 1:
                try:
                    factory = ActiveMQConnectionFactory('failover:(tcp://172.22.91.55:61616)','hiscp', 'hiscp')
                    logging.warning('创建连接ActiveMQ...')
                    conn = factory.createConnection()
                    exlistener = ExceptionListener()
                    conn.exceptionListener = exlistener
                    logging.warning('创建会话ActiveMQ...')
                    session = conn.createSession(AcknowledgeMode.AUTO_ACKNOWLEDGE)
                    logging.warning('创建主题ActiveMQ...')
                    topic = session.createTopic('HISCP.HISENSE.PASS.PASSINF')
                    logging.warning('创建消费者ActiveMQ...')
                    consumer = session.createConsumer(topic)
                    logging.warning('启动连接ActiveMQ...')
                    conn.start()
                    logging.warning('连接ActiveMQ成功,开始接收消息')
                    start = timeit.default_timer()
                    LOC = {}
                    error_times = 0
                    while 1:
                        try:
                            msg = consumer.receive(timeout=3)
                        except CMSException, e:
                            error_times+=1
                            if error_times>5:
                                break
                        else:
                            if msg:
                                error_times = 0
                                row = self.format_msg(msg.text)
                                #logging.error(row)
                                if row:
                                    try:
                                        #print row
                                        if len(row['image_url'])>10:
                                            if cache.get(row['image_url']) is None:
                                                cache[row['image_url']] = 1
                                            else:#过滤重复信息
                                                pipe.incr(time.strftime("%Y%m%d",time.localtime(time.time()))+":HISENSE:ERR")
                                                cache[row['image_url']] += 1
                                                logging.warning('重复数据[%s][%d]',row['image_url'],cache[row['image_url']])
                                                continue
                                        pipe.incr(time.strftime("%Y%m%d",time.localtime(time.time()))+":HISENSE")
                                        pipe.rpush(self.config['redis_mq']['queue_key_name'], json.dumps(row))#,ensure_ascii=False
                                        if pipe.__len__()==1:
                                            start = timeit.default_timer()
                                        #self.MSG_QUEEN.put(row)
                                    except UnicodeDecodeError,ude:
                                        logging.error('编码json时错误: %s',row['license_plate'])
                        if timeit.default_timer()-start>1 or pipe.__len__()>100:#1秒或100条入redis一次
                            pipe.execute()
                    #conn.close()
                    logging.warning('关闭ActiveMQ...')
                except KeyboardInterrupt:
                    logging.error('Ctrl+C,终止运行')
                    return
                except Exception, e:
                    logging.exception('连接ActiveMQ时错误: %s', str(e))
                    time.sleep(10)
        except Exception, e:
            logging.exception('取数据时错误: %s', str(e))
            sys.exit(0)
    
if __name__ == "__main__":
    daemon = MyDaemon('/var/run/yisa_get_msg.pid')
    #daemon.run()
    #sys.exit(0)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)



'''
VMKS,2.5,0002,049c5be29e7647858d999b19c4200d74,02,冀J026BH,02,711542002002,澎湖岛街海河路西,1,370211001000029114,02,0,2017-04-13 16:49:34,370211001000,2,http://10.49.217.87:7017/image/vrb2/i2/d1fc4050c1c8432c8bddcc60f7bd62b9/00015?key=48c2&offset=838928378&high=372193,,,J,001,01,262/295/38/9/1,2,2017-04-13 16:49:34,K33,000,00,2,2,1,1,0,0,0,1
VMKS,2.5,0016,c160a2b28bec4cdb9ff2526b56169b62,09,鲁BR91L1,02,611201009300,昆仑山路前湾港路北300米,1,370211002000233011,02,32,2017-04-13 16:49:36,370211002000,4,http://10.49.217.87:7001/image/vrb2/i2/c71625765d8f422a805451422ac99355/00055?key=9c6&offset=2126852738&high=308273,http://10.49.217.87:7001/image/vrb2/i2/c71625765d8f422a805451422ac99355/00055?key=9c5&offset=2126848545&high=4113,,Z,000,01,637/521/112/32/1,2,2017-04-13 16:49:36,K33,000,00,0,0,0,0,0,0,0,1
'''
